-
Notifications
You must be signed in to change notification settings - Fork 232
Add a callback parameter to produced messages, called upon receipt #506
base: master
Are you sure you want to change the base?
Conversation
Rather than iterate over a delivery report, an optional callback per message is used that gets called when the message is marked as delivered. This makes it easier to attach behaviors to particular messages without having to correlate message bodies in a delivery report.
Thanks @mikepk. I like this idea, and I think you've implemented it in a reasonable way that doesn't overly complicate the interface. Might it be useful for the callback function to accept the Some integration tests for the callback functions would be good as well, even if it's just a single test making sure that the function is called properly. |
i like the callback interface. two thoughts. I am personally more interested when the message fails to be delivered so i would also push for a failure_callback as well. second, make sure you catch any exception and bind it and allow the thread to report the exception. here is a pr that does that for the rebalance callback. |
When I have some spare time I'll go ahead and look into a failure_callback, exception, and tests. For failure callback are you thinking a call signature like |
I'd advocate for not adding more than a single callback. Instead of a success callback and an error callback, we could easily get by with a single callback taking a parameter that indicates whether there was an error - maybe the exception instance. |
Just to weigh in: a callback is interesting, and +1 to having a single callback that gets intuitive parameters for whether the message was successful. |
This could use another look and some tests if you're still interested in getting it into master, @mikepk. Thanks again for the contribution! |
Rather than iterate over a delivery report, an optional callback per message
is used that gets called when the message is marked as delivered. This
makes it easier to attach behaviors to particular messages without having to
correlate message bodies in a delivery report.
A sample use case is some kind of cleanup code that should only be run when a message has been guaranteed delivered. For example, a data / message bridge where messages can be dequeued (or acknowledged) from a master queue only when delivery is guaranteed in Kafka to avoid data loss. In my case, a rabbitMQ message broker is being replicated into Kafka. The message in Rabbit should only be ack'd when delivery is guaranteed otherwise there could be data loss.
This could be done by creating a registry of all sent messages and their cleanup code, then iterating over the delivery report and correlating the message with the registry, but that gets complicated and messy fast. This code uses the existing record of produced messages in pykafka (the message set / message_batch) and adds an optional callback to each message. When the delivery check is done, the callback attached to the message is executed if there is one. This keeps the async nature of the delivery check but allows attaching behaviors to the delivery check.